Source code for hysop.tools.parameters

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Light classes to handle parameters for classes construction.

.. currentmodule hysop.tools

* :class:`~MPIParams`
* :class:`~CartesianDiscretization`

"""

import hashlib
from collections import namedtuple

from hysop.constants import HYSOP_DEFAULT_TASK_ID, BoundaryCondition
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.hash import hash_communicator
from hysop.tools.numpywrappers import npw
from hysop.core.mpi import main_comm, main_rank, MPI


[docs] class MPIParams( namedtuple("MPIParams", ["comm", "size", "task_id", "rank", "on_task"]) ): """ Struct to save mpi parameters : - comm : parent mpi communicator (default = main_comm) - task_id : id of the task that owns this object (default = HYSOP_DEFAULT_TASK_ID) - rank of the current process in comm - on_task : true if the task_id of the object corresponds to the task_id of the current process. This struct is useful for operators : each operator has a MPIParams attribute to save its mpi settings. Examples --------- op = SomeOperator(..., task_id=1) if op.is_on_task(): ... 'is_on_task' will return MPIParams.on_task value for op and tell if the current operator belongs to the current process mpi task. """ def __new__( cls, comm=main_comm, task_id=HYSOP_DEFAULT_TASK_ID, rank=main_rank, on_task=True ): if not on_task: rank = MPI.PROC_NULL comm = MPI.COMM_NULL size = -1 elif comm != MPI.COMM_NULL: rank = comm.Get_rank() size = comm.Get_size() else: rank = MPI.UNDEFINED size = -1 return super().__new__(cls, comm, size, task_id, rank, on_task)
[docs] def diff(self, other): d = {} if self.task_id != other.task_id: d["task_id"] = (self.task_id, other.task_id) if self.on_task != other.on_task: d["on_task"] = self.on_task != other.on_task if not (self.comm is other.comm): d["comm"] = (self.comm, other.comm) return d
def __eq__(self, other): if self.__class__ != other.__class__: return NotImplemented eq = self.task_id == other.task_id eq &= self.on_task == other.on_task eq &= self.comm is other.comm return eq def __ne__(self, other): return not self.__eq__(other) def __hash__(self): h = hashlib.sha1() h.update(str(self.task_id).encode("utf-8")) h.update(str(self.on_task).encode("utf-8")) return hash(h.hexdigest()) ^ id(self.comm)
[docs] class CartesianDiscretization( namedtuple( "CartesianDiscretization", ["resolution", "ghosts", "lboundaries", "rboundaries"], ) ): """ A struct to handle discretization parameters: - a resolution (either a list of int or a numpy array of int) resolution is GRID_RESOLUTION. GLOBAL_RESOLUTION is GRID_RESOLUTION + PERIODICITY. - number of points in the ghost-layer. One value per direction, list or array. Default = None (ie. no ghosts). - global boundary conditions that should be prescribed on the left and the right of the box shaped domain for each axis. Defaults to periodic boundary conditions everywhere. """ def __new__( cls, resolution, ghosts=None, lboundaries=None, rboundaries=None, default_boundaries=False, ): assert not ((lboundaries is None) ^ (rboundaries is None)) from hysop.tools.numpywrappers import npw resolution = npw.asdimarray(resolution) if ghosts is not None: ghosts = npw.asintegerarray(ghosts) msg = "Dimensions of resolution and ghosts parameters" msg += " are not complient." assert ghosts.size == resolution.size, msg assert all(ghosts >= 0) else: ghosts = npw.integer_zeros(resolution.size) assert not ((lboundaries is None) ^ (rboundaries is None)) if default_boundaries: assert lboundaries is None assert rboundaries is None lboundaries = npw.empty(shape=(resolution.size,), dtype=object) lboundaries[...] = BoundaryCondition.PERIODIC rboundaries = lboundaries.copy() check_instance( lboundaries, npw.ndarray, dtype=object, size=resolution.size, values=BoundaryCondition, allow_none=True, ) check_instance( rboundaries, npw.ndarray, dtype=object, size=resolution.size, values=BoundaryCondition, allow_none=True, ) npw.set_readonly(resolution, ghosts) if lboundaries is not None: npw.set_readonly(lboundaries, rboundaries) return super().__new__(cls, resolution, ghosts, lboundaries, rboundaries) @property def boundaries(self): """Left and right boundary conditions as a tuple.""" if self.lboundaries is None: raise AttributeError else: return (self.lboundaries, self.rboundaries) @property def periodicity(self): if (self.lboundaries is None) or (self.rboundaries is None): raise AttributeError else: return self.lboundaries == BoundaryCondition.PERIODIC @property def grid_resolution(self): """Effective grid resolution given by user.""" return self.resolution @property def global_resolution(self): """ Logical grid resolution (grid_resolution + periodicity). Can only be fetched if boundaries have been specified. """ return self.grid_resolution + self.periodicity def __eq__(self, other): if self.__class__ != other.__class__: return NotImplemented if (self.lboundaries is None) ^ (other.lboundaries is None): return False match = (self.resolution == other.resolution).all() match &= (self.ghosts == other.ghosts).all() match &= (self.lboundaries == other.lboundaries).all() match &= (self.rboundaries == other.rboundaries).all() return match def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result return not result def __str__(self): s = "Cartesian discretization:" s += f"\n *resolution: {self.resolution}" s += f"\n *ghosts: {self.ghosts}" if self.lboundaries: s += f"\n *lboundaries: {self.lboundaries.tolist()}" s += f"\n *rboundaries: {self.rboundaries.tolist()}" else: s += "\n *lboundaries: None" s += "\n *rboundaries: None" return s def __hash__(self): h = hashlib.sha1() h.update(self.resolution.view(npw.uint8)) h.update(self.ghosts.view(npw.uint8)) if self.lboundaries is not None: h.update( str(hash(tuple(int(bd) for bd in self.lboundaries))).encode("utf-8") ) if self.rboundaries is not None: h.update( str(hash(tuple(int(bd) for bd in self.rboundaries))).encode("utf-8") ) return hash(h.hexdigest())